{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Copyright 2019 The Kubeflow Authors. All Rights Reserved.\n",
    "#\n",
    "# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
    "# you may not use this file except in compliance with the License.\n",
    "# You may obtain a copy of the License at\n",
    "#\n",
    "#      http://www.apache.org/licenses/LICENSE-2.0\n",
    "#\n",
    "# Unless required by applicable law or agreed to in writing, software\n",
    "# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
    "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
    "# See the License for the specific language governing permissions and\n",
    "# limitations under the License.\n",
    "# =============================================================================="
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Lightweight Python Components\n",
    "\n",
    "To build a component, define a standalone python function and then call `kfp.components.func_to_container_op(func)` to convert your function to a component that can be used in a pipeline.\n",
    "\n",
    "There are several requirements for the function:\n",
    "\n",
    "- The function should be standalone. It should not use any code declared outside of the function definition. Any imports should be added inside the main function. Any helper functions must be defined inside the main function.\n",
    "- The function can only import packages that are available in the base image. If you need to import a package that's not available, you can try to find a container image that already includes the required packages.\n",
    "- **If the function operates on numbers, the parameters need to have type hints. Supported types are [int, float, bool]. Everything else is passed as string.**\n",
    "- To build a component with multiple output values, use the typing.NamedTuple type hint syntax: `NamedTuple('MyFunctionOutputs', [('output_name_1', type), ('output_name_2', float)])`"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp\n",
    "import kfp.gcp as gcp\n",
    "import kfp.dsl as dsl\n",
    "import kfp.compiler as compiler\n",
    "import kfp.components as comp\n",
    "\n",
    "import kubernetes as k8s"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameter"
    ]
   },
   "outputs": [],
   "source": [
    "# Required Parameters\n",
    "PROJECT_ID='<ADD GCP PROJECT HERE>'\n",
    "GCS_BUCKET='gs://<ADD STORAGE LOCATION HERE>'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create client\n",
    "\n",
    "If you run this notebook **outside** of a Kubeflow cluster, run the following command:\n",
    "- `host`: The URL of your Kubeflow Pipelines instance, for example \"https://`<your-deployment>`.endpoints.`<your-project>`.cloud.goog/pipeline\"\n",
    "- `client_id`: The client ID used by Identity-Aware Proxy\n",
    "- `other_client_id`: The client ID used to obtain the auth codes and refresh tokens.\n",
    "- `other_client_secret`: The client secret used to obtain the auth codes and refresh tokens.\n",
    "\n",
    "```python\n",
    "client = kfp.Client(host, client_id, other_client_id, other_client_secret)\n",
    "```\n",
    "\n",
    "If you run this notebook **within** a Kubeflow cluster, run the following command:\n",
    "```python\n",
    "client = kfp.Client()\n",
    "```\n",
    "\n",
    "You'll need to create OAuth client ID credentials of type `Other` to get `other_client_id` and `other_client_secret`. Learn more about [creating OAuth credentials](\n",
    "https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": [
     "parameter"
    ]
   },
   "outputs": [],
   "source": [
    "# Optional Parameters, but required for running outside Kubeflow cluster\n",
    "\n",
    "# The host for 'AI Platform Pipelines' ends with 'pipelines.googleusercontent.com'\n",
    "# The host for pipeline endpoint of 'full Kubeflow deployment' ends with '/pipeline'\n",
    "# Examples are:\n",
    "# https://7c021d0340d296aa-dot-us-central2.pipelines.googleusercontent.com\n",
    "# https://kubeflow.endpoints.kubeflow-pipeline.cloud.goog/pipeline\n",
    "HOST = '<ADD HOST NAME TO TALK TO KUBEFLOW PIPELINE HERE>'\n",
    "\n",
    "# For 'full Kubeflow deployment' on GCP, the endpoint is usually protected through IAP, therefore the following \n",
    "# will be needed to access the endpoint.\n",
    "CLIENT_ID = '<ADD OAuth CLIENT ID USED BY IAP HERE>'\n",
    "OTHER_CLIENT_ID = '<ADD OAuth CLIENT ID USED TO OBTAIN AUTH CODES HERE>'\n",
    "OTHER_CLIENT_SECRET = '<ADD OAuth CLIENT SECRET USED TO OBTAIN AUTH CODES HERE>'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This is to ensure the proper access token is present to reach the end point for 'AI Platform Pipelines'\n",
    "# If you are not working with 'AI Platform Pipelines', this step is not necessary\n",
    "! gcloud auth print-access-token"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Create kfp client\n",
    "in_cluster = True\n",
    "try:\n",
    "  k8s.config.load_incluster_config()\n",
    "except:\n",
    "  in_cluster = False\n",
    "  pass\n",
    "\n",
    "if in_cluster:\n",
    "    client = kfp.Client()\n",
    "else:\n",
    "    if HOST.endswith('googleusercontent.com'):\n",
    "        CLIENT_ID = None\n",
    "        OTHER_CLIENT_ID = None\n",
    "        OTHER_CLIENT_SECRET = None\n",
    "\n",
    "    client = kfp.Client(host=HOST, \n",
    "                        client_id=CLIENT_ID,\n",
    "                        other_client_id=OTHER_CLIENT_ID, \n",
    "                        other_client_secret=OTHER_CLIENT_SECRET)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start with a simple function"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#Define a Python function\n",
    "def add(a: float, b: float) -> float:\n",
    "   '''Calculates sum of two arguments'''\n",
    "   return a + b"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Convert the function to a pipeline operation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "add_op = comp.func_to_container_op(add)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## A more complex example, with multiple outputs"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Advanced function\n",
    "# Demonstrates imports, helper functions and multiple outputs\n",
    "from typing import NamedTuple\n",
    "\n",
    "def my_divmod(dividend: float, \n",
    "              divisor: float,\n",
    "             ) -> NamedTuple('MyDivmodOutput', [('quotient', float), ('remainder', float), \n",
    "                                                ('mlpipeline_ui_metadata', 'UI_metadata'), \n",
    "                                                ('mlpipeline_metrics', 'Metrics')]):\n",
    "    \n",
    "    '''Divides two numbers and calculate  the quotient and remainder'''\n",
    "    \n",
    "    #Imports inside a component function:\n",
    "    import numpy as np\n",
    "\n",
    "    #This function demonstrates how to use nested functions inside a component function:\n",
    "    def divmod_helper(dividend, divisor):\n",
    "        return np.divmod(dividend, divisor)\n",
    "\n",
    "    (quotient, remainder) = divmod_helper(dividend, divisor)\n",
    "\n",
    "    import json\n",
    "    \n",
    "    # Exports a sample tensorboard:\n",
    "    metadata = {\n",
    "      'outputs' : [{\n",
    "        'type': 'tensorboard',\n",
    "        'source': 'gs://ml-pipeline-dataset/tensorboard-train',\n",
    "      }]\n",
    "    }\n",
    "\n",
    "    # Exports two sample metrics:\n",
    "    metrics = {\n",
    "      'metrics': [{\n",
    "          'name': 'quotient',\n",
    "          'numberValue':  float(quotient),\n",
    "        },{\n",
    "          'name': 'remainder',\n",
    "          'numberValue':  float(remainder),\n",
    "        }]}\n",
    "\n",
    "    from collections import namedtuple\n",
    "    divmod_output = namedtuple('MyDivmodOutput', \n",
    "                               ['quotient', 'remainder', 'mlpipeline_ui_metadata', 'mlpipeline_metrics'])\n",
    "    return divmod_output(quotient, remainder, json.dumps(metadata), json.dumps(metrics))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "my_divmod(100, 7)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "divmod_op = comp.func_to_container_op(func=my_divmod, \n",
    "                                      base_image=\"tensorflow/tensorflow:1.15.0-py3\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import kfp.dsl as dsl\n",
    "@dsl.pipeline(\n",
    "    name='Calculation pipeline',\n",
    "    description='A toy pipeline that performs arithmetic calculations.'\n",
    ")\n",
    "def calc_pipeline(\n",
    "    a='a',\n",
    "    b='7',\n",
    "    c='17',\n",
    "):\n",
    "    #Passing pipeline parameter and a constant value as operation arguments\n",
    "    add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. \n",
    "    \n",
    "    #Passing a task output reference as operation arguments\n",
    "    #For an operation with a single return value, the output reference can be accessed using `task.output` or `task.outputs['output_name']` syntax\n",
    "    divmod_task = divmod_op(add_task.output, b)\n",
    "\n",
    "    #For an operation with a multiple return values, the output references can be accessed using `task.outputs['output_name']` syntax\n",
    "    result_task = add_op(divmod_task.outputs['quotient'], c)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Submit the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = calc_pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = 'python-functions'\n",
    "\n",
    "#Specify pipeline argument values\n",
    "arguments = {'a': '7', 'b': '8'}\n",
    "\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "\n",
    "# Submit pipeline directly from pipeline function\n",
    "run_result = client.create_run_from_pipeline_func(pipeline_func, \n",
    "                                                  experiment_name=experiment_name, \n",
    "                                                  run_name=run_name, \n",
    "                                                  arguments=arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Train a keras model\n",
    "\n",
    "This following steps trains a neural network model to classify hand writing images using the [MNIST dataset](http://yann.lecun.com/exdb/mnist/)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def mnist_train(model_file: str, bucket: str) -> str:\n",
    "\n",
    "    from datetime import datetime\n",
    "    import tensorflow as tf\n",
    "    \n",
    "    model = tf.keras.models.Sequential([\n",
    "      tf.keras.layers.Flatten(input_shape=(28, 28)),\n",
    "      tf.keras.layers.Dense(512, activation=tf.nn.relu),\n",
    "      tf.keras.layers.Dropout(0.2),\n",
    "      tf.keras.layers.Dense(10, activation=tf.nn.softmax)\n",
    "    ])\n",
    "    \n",
    "    model.compile(optimizer='adam',\n",
    "                  loss='sparse_categorical_crossentropy',\n",
    "                  metrics=['accuracy'])\n",
    "    \n",
    "    print(model.summary())    \n",
    "    \n",
    "    mnist = tf.keras.datasets.mnist\n",
    "    (x_train, y_train),(x_test, y_test) = mnist.load_data()\n",
    "    x_train, x_test = x_train / 255.0, x_test / 255.0\n",
    "\n",
    "    callbacks = [\n",
    "      tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()),\n",
    "      # Interrupt training if `val_loss` stops improving for over 2 epochs\n",
    "      tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),\n",
    "    ]\n",
    "    \n",
    "    model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,\n",
    "              validation_data=(x_test, y_test))\n",
    "    \n",
    "    \n",
    "    model.save(model_file)\n",
    "    \n",
    "    from tensorflow import gfile\n",
    "    \n",
    "    gcs_path = bucket + \"/\" + model_file\n",
    "    \n",
    "    if gfile.Exists(gcs_path):\n",
    "        gfile.Remove(gcs_path)\n",
    "    \n",
    "    gfile.Copy(model_file, gcs_path)\n",
    "    \n",
    "    return gcs_path"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "scrolled": false
   },
   "outputs": [],
   "source": [
    "mnist_train(model_file='mnist_model.h5', \n",
    "             bucket=GCS_BUCKET)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "model_train_op = comp.func_to_container_op(func=mnist_train, \n",
    "                                           base_image=\"tensorflow/tensorflow:1.15.0-py3\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define and submit the pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "@dsl.pipeline(\n",
    "   name='Mnist pipeline',\n",
    "   description='A toy pipeline that performs mnist model training.'\n",
    ")\n",
    "def mnist_pipeline(\n",
    "    model_file: str = 'mnist_model.h5', \n",
    "    bucket: str = GCS_BUCKET\n",
    "):\n",
    "    model_train_op(model_file=model_file, bucket=bucket).apply(gcp.use_gcp_secret('user-gcp-sa'))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Submit a pipeline run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "pipeline_func = mnist_pipeline"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "experiment_name = 'minist_kubeflow'\n",
    "\n",
    "arguments = {\"model_file\":\"mnist_model.h5\",\n",
    "             \"bucket\":GCS_BUCKET}\n",
    "\n",
    "run_name = pipeline_func.__name__ + ' run'\n",
    "\n",
    "# Submit pipeline directly from pipeline function\n",
    "run_result = client.create_run_from_pipeline_func(pipeline_func, \n",
    "                                                  experiment_name=experiment_name, \n",
    "                                                  run_name=run_name, \n",
    "                                                  arguments=arguments)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**As an alternative, you can compile the pipeline into a package.** The compiled pipeline can be easily shared and reused by others to run the pipeline.\n",
    "\n",
    "```python\n",
    "pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'\n",
    "compiler.Compiler().compile(pipeline_func, pipeline_filename)\n",
    "\n",
    "experiment = client.create_experiment('python-functions-mnist')\n",
    "\n",
    "run_result = client.run_pipeline(\n",
    "    experiment_id=experiment.id, \n",
    "    job_name=run_name, \n",
    "    pipeline_package_path=pipeline_filename, \n",
    "    params=arguments)\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "virtualPython35",
   "language": "python",
   "name": "virtualpython35"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.7"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
